Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-20625][pubsub,e2e] Add PubSubSource connector using FLIP-27 #2

Open
wants to merge 8 commits into
base: main
Choose a base branch
from

Conversation

RyanSkraba
Copy link

What is the purpose of the change

Brief change log

Verifying this change

Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing

This change added tests and can be verified as follows:

  • Adds end-to-end test using the Pub/Sub emulator.

(example:)

  • Added integration tests for end-to-end deployment with large payloads (100MB)
  • Extended integration test for recovery after master (JobManager) failure
  • Added test that validates that TaskInfo is transferred only once across recoveries
  • Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? docs / JavaDocs

@boring-cyborg
Copy link

boring-cyborg bot commented Jan 26, 2023

Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)

JakobEdding and others added 5 commits June 28, 2023 09:07
* WIP

* WIP

* Working WIP

* Clean up

* Place new Pub/Sub source into existing Pub/Sub connector module

* Clean up

* Apply Spotless code formatting

[FLINK-20625][pubsub,e2e] Attempt to support stopping the reader when stopmark is encountered

[FLINK-20625][pubsub,e2e] Add checkpointing and do some refactorings

* Simplify fetching from Pub/Sub in SplitReader

* Allow Pub/Sub source to be only continuous unbounded

* Add basic PubSubSource builder

* Add configuration options for SubscriberFactory to PubSubSource, remove unused collector

* Add checkpointing

[FLINK-20625][pubsub,e2e] Allow multiple records inside single Pub/Sub message for deserialization

[FLINK-20625][pubsub,e2e] Add Javadocs, README and clean up

[FLINK-20625][pubsub,e2e] Reduce visibility of classes and their members

[FLINK-20625][pubsub,e2e] Propagate Pub/Sub subscriber creation errors from SplitReader

[FLINK-20625][pubsub,e2e] Use constants for default Pub/Sub subscriber parameters

[FLINK-20625][pubsub,e2e] Fix dynamic Scala version in artifact example

[FLINK-20625][pubsub,e2e] Rename PubSubEnumeratorCheckpoint -> PubSubEnumeratorState

[FLINK-20625][pubsub,e2e] Add version checks for deserialization

[FLINK-20625][pubsub,e2e] Remove unnecessary declaration of exception-throwing

[FLINK-20625][pubsub,e2e] Remove disfunctional end-of-stream logic

[FLINK-20625][pubsub,e2e] Avoid concurrency issues with list of Pub/Sub messages to acknowledge

[FLINK-20625][pubsub,e2e] Refactor PubSubSourceBuilder

[FLINK-20625][pubsub,e2e] Clarify consistency guarantee description

[FLINK-20625][pubsub,e2e] Clarify Pub/Sub request timeout

[FLINK-20625][pubsub,e2e] Restructure and extend readme, add basic architecture info to docstring

[FLINK-20625][pubsub,e2e] Attempt to solve concurrency issues with checkpointing
@MartijnVisser MartijnVisser force-pushed the rskraba/FLINK-20625-pubsub-PR18823 branch from 7813345 to 3b33d92 Compare June 28, 2023 07:10
@MartijnVisser
Copy link
Contributor

@RyanSkraba Do you have any idea who could help with a review here?

@XComp
Copy link

XComp commented Oct 4, 2023

@dchristle how about you? 😉 Would you have time to look at this?

@dchristle
Copy link

Hi @RyanSkraba,

Thank you for shepherding and improving this connector implementation over the years. I'm hoping to help review & improve it so we can finally get it merged. I just opened RyanSkraba#1 to add some improvements based on my experience running this code in production. Our internal implementation has some other improvements, but they need to be properly cleaned up & can come in a later PR, as they aren't essential. Please take a look when you get a chance.

dchristle and others added 3 commits November 13, 2023 13:49
* Add retry mechanism to acknowledging pubsub messages

* Remove synchronized locks and instead decouple reader and fetcher threads with ArrayBlockingQueue. This fixes a concurrency bug between separate threads calling fetch/acknowledgeMessages.

* Handle failed checkpoints and ensure subscriber is not null in acknowledgeMessages.

* Implement splitAckIds with iteration instead of recursion. Fixes bug where payload can exceed maxPayload.

---------

Co-authored-by: Richard Deurwaarder <[email protected]>
@RyanSkraba
Copy link
Author

Just a quick update on this PR! Thanks so much @dchristle for the review and the improvements -- I've included everything here in the PR and added you as a collaborator to this branch. Please feel free to apply any additional changes that you've learned from your use in production! That's really the most valuable source of info for a new connector.

I've applied the changes you've requested, as well as some other very minor code clean-ups suggested through the IDE (some unused exceptions, another missing Java generic and using a deprecated super-constructor).

@snuyanzin
Copy link
Contributor

@dchristle thanks for the review
May I ask you to have another iteration review for this PR?

Copy link
Contributor

@vahmed-hamdy vahmed-hamdy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a general question, if we are delaying acks to checkpoints, how are we going to handle if checkpoint duration is bigger than ack deadline on pubsub source, also we are having multiple pull requests ahead of acking, this causes multiple duplicates IIUC. This is not tested in IT tests added.

implements Source<OUT, PubSubSplit, PubSubEnumeratorState>, ResultTypeQueryable<OUT> {
private final PubSubDeserializationSchema<OUT> deserializationSchema;
private final PubSubSubscriberFactory pubSubSubscriberFactory;
private final Properties props;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rename props, is t client props, reader props,....?

private String subscriptionName;
private PubSubSubscriberFactory pubSubSubscriberFactory;
private Properties props;
private Credentials credentials;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should pass credential provider instead, to natively support dynamic credentials discovery and refreshing.

new DefaultPubSubSubscriberFactory(
ProjectSubscriptionName.format(projectName, subscriptionName),
DEFAULT_PUBSUB_SUBSCRIBER_NUMBER_OF_RETRIES,
Duration.ofSeconds(15),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why not expose as default like the others?

*
* @param <OUT> The output type of the source.
*/
public class PubSubSource<OUT>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing tag, @PublicEvolving or @Public

* The enumerator for the {@link PubSubSource}. It does not do any work discovery as envisioned by
* FLIP-27 because GCP Pub/Sub hides partitions and other implementation details.
*/
public class PubSubSourceEnumerator implements SplitEnumerator<PubSubSplit, PubSubEnumeratorState> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing annotation, @Internal

FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple2<T, Long>>> elementsQueue,
Supplier<SplitReader<Tuple2<T, Long>, PubSubSplit>> splitReaderSupplier,
Configuration config) {
super(elementsQueue, splitReaderSupplier, config);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deprecated constructor

}
}

private void enqueuePrepareForAcknowledgementTask(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are calling this on onCheckpointComplete where we enqueue a task that would possibly not start immediately. What happens if we fail during acking.
If we move the acking responsiblity to Source reader, that would be much cleaner and we could make it synchronous

@@ -0,0 +1,181 @@
/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The file extension is not correct, also Maybe we rename it EmulatedPubSubSourceV2Test

*
*/

package org.apache.flink.connector.gcp.pubsub;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be enough to add to e2e tests module

import static org.junit.Assert.assertTrue;

/** Test of {@link PubSubSource} against the GCP Pub/Sub emulator SDK. */
public class EmulatedPubSubSourceTest extends GCloudUnitTestBase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use Junit5

@markma0215
Copy link

could I know when this PR will be merged and release in which version? thx

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants